//purpose: bitmap management for lsv

//file created by zsy on 2017.03.01
// the component supports lsv and row2 as well, therfor use volume format to destingush how to write bitmap unit, on lsv its page level bitmap while row2 is chunk level.

#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <sys/types.h>
#include <assert.h>
#include <stack.h>

#include "list.h"
#include "lsv_volume.h"
#include "lsv_bitmap.h"
#include "lsv_bitmap_snap.h"
#include "lsv_bitmap_internal.h"
#include "lsv_conf.h"
#include "lsv_help.h"
#include "lsv_log.h"
#include "lsv_rcache.h"

#include "lsv_bitmap_hash_cache.h"

#ifdef CACHE_NEW
#include "lsv_bitmap_cache_impl.h"
#endif

#define LSV_BITMAP_ENABLE_HASH_CACHE

#define OFFSET_TO_INDEX(x)                (( (x) / LSV_PAGE_SIZE) / ( BITMAP_CHUNK_SIZE /sizeof(struct lsv_bitmap_unit)) )
#define OFFSET_TO_RNDEX(x)                (( (x) / LSV_PAGE_SIZE) % ( BITMAP_CHUNK_SIZE /sizeof(struct lsv_bitmap_unit)) )

#define BITMAP_BUF_SIZE     BITMAP_CHUNK_SIZE

#define BITMAP_CACHE_SIZE   CHUNK_SIZE

/*static inline lsv_bitmap_get_dynamic_size(void *volume_context)
{

}*/

int lsv_bitmap_create_node(void *volume_context, struct lsv_bitmap_context **bitmap_context, uint8_t is_root, uint32_t type)
{
        int ret = 0;
        lsv_volume_proto_t *lsv_vol = (lsv_volume_proto_t *)volume_context;

        uint32_t header_size = round_up(sizeof(struct lsv_bitmap_header) + BITMAP_CHUNK_HEADER_SIZE(lsv_vol->size), CHUNK_SIZE);
        uint32_t bitmap_size = sizeof(struct lsv_bitmap_context);// + BITMAP_CHUNK_HEADER_SIZE(lsv_vol->size);

        LSV_DBUG("bitmap header size %lu %d \r\n", sizeof(struct lsv_bitmap_header) , header_size);

        *bitmap_context = xmalloc(bitmap_size);
        if(!*bitmap_context) {
                DFATAL("no memory.\r\n");
                return -ENOMEM; //no memory.
        }

        memset((*bitmap_context), 0, bitmap_size);
        (*bitmap_context)->bitmap_header = xmalloc(header_size);
        if(!(*bitmap_context)->bitmap_header) {
               goto err_free1;
        }

        memset((*bitmap_context)->bitmap_header, 0, header_size);

        (*bitmap_context)->header_size = header_size;
        (*bitmap_context)->volume_context = volume_context;

        //    (*bitmap_context)->bitmap_cache_chunk_id = -1;
        //    (*bitmap_context)->bitmap_cache = malloc(BITMAP_CACHE_SIZE);
        struct lsv_bitmap_header *pheader = (*bitmap_context)->bitmap_header;

        //if(header_size % CHUNK_SIZE)
        //        header_size += CHUNK_SIZE - (header_size % CHUNK_SIZE);

        memset(pheader, 0, header_size);

        strncpy(pheader->sign, is_root?LSV_BITMAP_SIGN:LSV_BITMAP_SNAP_SIGN, sizeof(pheader->sign)-1);
        pheader->version[0] = 0;
        pheader->version[1] = 1;

        pheader->active_chunk_id = 0;
        pheader->first_child_chunk_id = 0;
        pheader->next_chunk_id = 0;
        pheader->type_and_flag = type;
        pheader->chunk_map_count = NUM_OF_BITMAP_CHUNK(lsv_vol->size);
        pheader->create_time = (uint64_t)gettime();

        lsv_bitmap_init_lock(*bitmap_context);

        for(int i=0;i<header_size;i+= CHUNK_SIZE)
        {
                uint32_t chunk_id;
                int ret = lsv_bitmap_alloc_chunk(volume_context, &chunk_id, 0);

                if(ret || chunk_id == 0)
                {
                        DFATAL("allocate chunk failed. ret=%d\r\n", ret);
                        //fatal error.
                        goto err_free2;
                }

                LSV_DBUG("malloc chunk %u\n", chunk_id);

                pheader->header_chunk_ids[i/CHUNK_SIZE] = chunk_id;
        }

        //lsv_bitmap_cache_init(*bitmap_context);

#ifdef LSV_BITMAP_ENABLE_HASH_CACHE
        #ifdef CACHE_NEW
                void **c =&(*bitmap_context)->hash_cache;
                ret = cache_api->init((bm_cache_t *)c, "bm_hash_cache", volume_context);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);
        #else
                (*bitmap_context)->hash_cache = xmalloc(sizeof(lsv_hash_cache_context_t));
                ret = lsv_hash_cache_init((*bitmap_context)->hash_cache, "lsv", volume_context);
                LSV_DBUG("bitmap hash_cache init, %p\n", (*bitmap_context)->hash_cache);
                YASSERT((*bitmap_context)->hash_cache);
        #endif
#endif

        if(is_root)
                ((lsv_volume_proto_t *)volume_context)->bitmap_context = *bitmap_context;

        return ret;

err_free2:
        xfree((*bitmap_context)->bitmap_header);
err_free1:
        xfree(*bitmap_context);
        return -ENOMEM;
}

int lsv_bitmap_init(void *volume_context, uint32_t create_type, uint32_t flag)   //root initialization.
{
        int ret;
        struct lsv_bitmap_context *bitmap_context;
        lsv_volume_proto_t *lsv_vol = (lsv_volume_proto_t *)volume_context;

        if(create_type == LSV_SYS_CREATE)
        {
                ret = lsv_bitmap_create_node(volume_context, &bitmap_context, LSV_BITMAP_TRUE, LSV_BITMAP_SNAP_TYPE_VOLUME);

                if(ret)
                        return ret;
                else
                {
                        LSV_DBUG("update bitmap_chunk_id %u\n", bitmap_context->bitmap_header->header_chunk_ids[0]);
                        ((lsv_volume_proto_t *)volume_context)->u.bitmap_chunk_id = bitmap_context->bitmap_header->header_chunk_ids[0];

                        bitmap_context->is_loading = 1;
                        bitmap_context->flags = flag;
                        bitmap_context->bitmap_header->vvol_id = 1;
                        bitmap_context->bitmap_header->vvol_map[1].vol_id = ((lsv_volume_proto_t *)volume_context)->ino;

                        ret = bitmap_header_save(bitmap_context, SAVE_HEADER_LEVEL_ALL);
                        bitmap_context->is_loading = 0;

                        return ret;
                }
        }
        else
        {
                bitmap_context = (struct lsv_bitmap_context *)xmalloc(sizeof(struct lsv_bitmap_context) /*+ BITMAP_CHUNK_HEADER_SIZE(lsv_vol->size)*/);
                if(!bitmap_context)
                        return -ENOMEM; //no memory.

                memset(bitmap_context, 0, sizeof(struct lsv_bitmap_context));// + BITMAP_CHUNK_HEADER_SIZE(lsv_vol->size));
                bitmap_context->header_size = round_up(sizeof(struct lsv_bitmap_header) + BITMAP_CHUNK_HEADER_SIZE(lsv_vol->size), LSV_CHUNK_SIZE);

                bitmap_context->bitmap_header = xmalloc(bitmap_context->header_size);
                if(!bitmap_context->bitmap_header) {
                        xfree(bitmap_context);
                        return -ENOMEM;
                }

                memset(bitmap_context->bitmap_header, 0, bitmap_context->header_size);

                bitmap_context->volume_context = volume_context;

                lsv_bitmap_init_lock(bitmap_context);
                //        bitmap_context->bitmap_cache_chunk_id = -1;
                //        bitmap_context->bitmap_cache = malloc(BITMAP_CACHE_SIZE);

                bitmap_context->is_loading = 1;
                bitmap_context->flags = flag;
                LSV_DBUG("loading header, chunk_id=%d\r\n", ((lsv_volume_proto_t *)volume_context)->u.bitmap_chunk_id);

                ret = bitmap_header_load(bitmap_context, ((lsv_volume_proto_t *)volume_context)->u.bitmap_chunk_id);
                if(ret) {
                        DERROR("load header error, ret=%d\r\n", ret);
                        xfree(bitmap_context->bitmap_header);
                        xfree(bitmap_context);
                        return ret;
                }

                //lsv_bitmap_cache_init(bitmap_context);

#ifdef LSV_BITMAP_ENABLE_HASH_CACHE
                #ifdef CACHE_NEW
                        void **c =&bitmap_context->hash_cache;
                        ret = cache_api->init((bm_cache_t *)c, "bm_hash_cache", volume_context);
                        if (unlikely(ret))
                                UNIMPLEMENTED(__DUMP__);
                #else
                        bitmap_context->hash_cache = xmalloc(sizeof(lsv_hash_cache_context_t));
                        ret = lsv_hash_cache_init(bitmap_context->hash_cache, "lsv", volume_context);
                        LSV_DBUG("bitmap hash_cache init, %p\n", bitmap_context->hash_cache);
                #endif
#endif
        }


        ((lsv_volume_proto_t *)volume_context)->bitmap_context = bitmap_context;

        return ret;
}


int lsv_bitmap_init_by_buf(void *volume_context, uint64_t parent_vol_id, void *buf, uint32_t flags) //clone.
{
        //struct lsv_bitmap_context *bitmap_context = ((lsv_volume_proto_t *)volume_context)->bitmap_context;

        (void) parent_vol_id;
        struct lsv_bitmap_header * snap_hdr = (struct lsv_bitmap_header *)buf;
        if(snap_hdr)
        {
                struct lsv_bitmap_context * node_new;

                int ret = lsv_bitmap_create_node(volume_context, &node_new, LSV_BITMAP_FALSE, LSV_BITMAP_SNAP_TYPE_VOLUME);
                if(ret)
                        return ret;

                node_new->volume_context = volume_context;
                node_new->flags = flags;
                //strncpy(node_new->bitmap_header->name, snap_name, sizeof(node_new->bitmap_header->name));

                int idx = 0;
                for(idx=0;idx<=snap_hdr->vvol_id;idx++) //copy from parent.
                {
                        node_new->bitmap_header->vvol_map[idx] = snap_hdr->vvol_map[idx];
                }

                node_new->bitmap_header->vvol_id = idx;
                node_new->bitmap_header->vvol_map[idx].vol_id = ((lsv_volume_proto_t *)volume_context)->ino;;

                for(int j=0;j<snap_hdr->chunk_map_count;j++)
                {
                        node_new->bitmap_header->chunk_map[j].chunk_id = snap_hdr->chunk_map[j].chunk_id;
                        if(node_new->bitmap_header->chunk_map[j].chunk_id)
                        {
                                node_new->bitmap_header->chunk_map[j].vvol_id = snap_hdr->chunk_map[j].vvol_id;

                                if(node_new->bitmap_header->chunk_map[j].vvol_id == 0) //parent is root.
                                        node_new->bitmap_header->chunk_map[j].vvol_id = 1;
                                node_new->bitmap_header->chunk_map[j].is_ref = 1;
                        }
                        else

                        {
                                node_new->bitmap_header->chunk_map[j].vvol_id = 0;
                                node_new->bitmap_header->chunk_map[j].is_ref = 0;
                        }
                }

                /*struct lsv_bitmap_context *p = lsv_bitmap_get_parent_volume(bitmap_context);
                 for(int i=0;p&&i<MAX_VVOL_COUNT;i++)
                 {
                 bitmap_context->bitmap_header->vvol_map[i + 1].vol_id = p->bitmap_header->vvol_map[0].vol_id;

                 p = lsv_bitmap_get_parent_volume(p);
                 }*/

                memset(node_new->bitmap_header->header_chunk_ids, 0, sizeof(node_new->bitmap_header->header_chunk_ids));

                ret = bitmap_header_save(node_new, SAVE_HEADER_LEVEL_ALL);

                if(ret)
                {
                        //delete node.

                        for(int j=0;j<sizeof(node_new->bitmap_header->header_chunk_ids) / sizeof(uint32_t); j++)
                                lsv_volume_chunk_free(volume_context, LSV_BITMAP_STORAGE_TYPE,node_new->bitmap_header->header_chunk_ids[j]);

                        lsv_bitmap_free_node(node_new);

                        return ret;
                }

                lsv_bitmap_init_lock(node_new);

                lsv_bitmap_cache_init(node_new);

                ((lsv_volume_proto_t *)volume_context)->bitmap_context = node_new;

                ((lsv_volume_proto_t *)volume_context)->u.bitmap_chunk_id = node_new->bitmap_header->header_chunk_ids[0];
                //return lsv_bitmap_protect_node(snap, 1);

                return 0;
        }

        return -1;
}

int lsv_bitmap_is_ready(void *volume_context)
{
        struct lsv_bitmap_context *bitmap_context = ((lsv_volume_proto_t *)volume_context)->bitmap_context;

        if(!bitmap_context)     //volume is removed.
                return 0;

        return !bitmap_context->is_loading;
}

int lsv_bitmap_init_by_snap(void *parent_volume_context, void *volume_context, const char *snap_name) //clone.
{
        struct lsv_bitmap_context *bitmap_context = ((lsv_volume_proto_t *)parent_volume_context)->bitmap_context;

        struct lsv_bitmap_context * snap = lsv_bitmap_get_by_name(bitmap_context, snap_name);
        if(!snap)
        {
                struct lsv_bitmap_context * node_new;

                int ret = lsv_bitmap_create_node(parent_volume_context, &node_new, LSV_BITMAP_FALSE, LSV_BITMAP_SNAP_TYPE_VOLUME);
                if(ret)
                        return ret;

                node_new->volume_context = volume_context;
                strncpy(node_new->bitmap_header->name, snap_name, sizeof(node_new->bitmap_header->name));

                for(int j=0;j<snap->bitmap_header->chunk_map_count;j++)
                {
                        node_new->bitmap_header->chunk_map[j].chunk_id = snap->bitmap_header->chunk_map[j].chunk_id;
                        if(node_new->bitmap_header->chunk_map[j].chunk_id)
                        {
                                node_new->bitmap_header->chunk_map[j].vvol_id = snap->bitmap_header->chunk_map[j].vvol_id;
                                node_new->bitmap_header->chunk_map[j].is_ref = 1;
                        }
                        else
                        {
                                node_new->bitmap_header->chunk_map[j].vvol_id = 0;
                                node_new->bitmap_header->chunk_map[j].is_ref = 0;
                        }
                }

                int idx = 0;
                for(idx=0;idx<=snap->bitmap_header->vvol_id;idx++) //copy from parent.
                {
                        bitmap_context->bitmap_header->vvol_map[idx] = snap->bitmap_header->vvol_map[idx];
                }

                bitmap_context->bitmap_header->vvol_id = idx;
                bitmap_context->bitmap_header->vvol_map[idx].vol_id = ((lsv_volume_proto_t *)parent_volume_context)->ino;

                /*struct lsv_bitmap_context *p = lsv_bitmap_get_parent_volume(bitmap_context);
                for(int i=0;p&&i<MAX_VVOL_COUNT;i++)
                {
                        bitmap_context->bitmap_header->vvol_map[i + 1].vol_id = p->bitmap_header->vvol_map[0].vol_id;

                        p = lsv_bitmap_get_parent_volume(p);
                }*/

                ret = bitmap_header_save(node_new, SAVE_HEADER_LEVEL_ALL);

                if(ret)
                {
                        //delte node.

                        for(int j=0;j<sizeof(node_new->bitmap_header->header_chunk_ids) / sizeof(uint32_t); j++)
                                lsv_volume_chunk_free(volume_context, LSV_BITMAP_STORAGE_TYPE,node_new->bitmap_header->header_chunk_ids[j]);

                        lsv_bitmap_free_node(node_new);

                        return ret;
                }

                lsv_bitmap_cache_init(node_new);

                ((lsv_volume_proto_t *)volume_context)->bitmap_context = node_new;

                return lsv_bitmap_protect_node(snap, 1);

                //return 0;
        }

        return -1;
}

void lsv_bitmap_free_single_node(struct lsv_bitmap_context *node)
{
        lsv_bitmap_free_lock(node);

#ifdef LSV_BITMAP_ENABLE_HASH_CACHE
        lsv_hash_cache_destroy(node->hash_cache);
        xfree(node->hash_cache);
        node->hash_cache = NULL;
#else
        lsv_bitmap_cache_free(node);
#endif

        xfree(node);
}

void lsv_bitmap_free_node(struct lsv_bitmap_context *root)
{
        stack_context_t stack;

        stack_init(&stack);

        stack_push(&stack, root);

        while(!stack_empty(&stack))
        {
                struct lsv_bitmap_context *node = stack_pop(&stack);

                if(node->next)
                {
                        stack_push(&stack, node->next);
                }

                if(node->first_child)
                {
                        stack_push(&stack, node->first_child);
                }

                lsv_bitmap_free_single_node(node);
        }
}

int lsv_bitmap_deinit(void *volume_context)
{
        struct lsv_bitmap_context *bitmap_context = ((lsv_volume_proto_t *)volume_context)->bitmap_context;
        if (bitmap_context) {
                lsv_bitmap_free_node(bitmap_context);
                ((lsv_volume_proto_t *)volume_context)->bitmap_context = NULL;
        }

        return 0;
}

int private bitmap_header_save(struct lsv_bitmap_context *bitmap_context, uint32_t level)
{
        lsv_volume_proto_t *lsv_vol = (lsv_volume_proto_t *)(bitmap_context->volume_context);
        uint32_t header_size = bitmap_context->header_size;//sizeof(struct lsv_bitmap_header) + BITMAP_CHUNK_HEADER_SIZE(lsv_vol->size);
        struct lsv_bitmap_header *pheader = bitmap_context->bitmap_header;
        uint32_t header_padding_size = header_size;

        (void) level;
        if(header_padding_size % CHUNK_SIZE)
                header_padding_size += CHUNK_SIZE - (header_padding_size % CHUNK_SIZE);

        LSV_DBUG("header size=%d \r\n", header_padding_size);

        if(pheader->header_chunk_ids[0] == 0)
        {
                for(int i=0;i<header_padding_size;i+= CHUNK_SIZE)
                {
                        uint32_t chunk_id = pheader->header_chunk_ids[i / CHUNK_SIZE];

                        if(chunk_id == 0) //need allocate.
                        {
                                int ret = lsv_bitmap_alloc_chunk(lsv_vol, &chunk_id, 0);
                                if(ret) {
                                        DFATAL("save bitmap header failed, ret = %d.\r\n", ret);
                                        return ret;
                                }

                                pheader->header_chunk_ids[i / CHUNK_SIZE] = chunk_id;
                        }
                }
        }

        for(int i=0;i<header_padding_size;i+= CHUNK_SIZE)
        {
                uint32_t chunk_id = pheader->header_chunk_ids[i / CHUNK_SIZE];
                int ret;

                if(i + CHUNK_SIZE == header_padding_size)   //the last.
                        ret = lsv_bitmap_write_chunk(bitmap_context->volume_context, chunk_id, 0, (header_size % CHUNK_SIZE) != 0? (header_size % CHUNK_SIZE):CHUNK_SIZE, ((char *)pheader) + i);
                else
                        ret = lsv_bitmap_write_chunk(bitmap_context->volume_context, chunk_id, 0, CHUNK_SIZE, ((char *)pheader) + i);

                if(ret) {
                        DFATAL("save bitmap header failed, ret = %d.\r\n", ret);
                        return ret;
                }
        }

        return 0;
}

int private bitmap_header_load_internal(struct lsv_bitmap_context *bitmap_context, uint32_t first_chunk_id)
{
        int ret;
        //lsv_volume_proto_t *lsv_vol = (lsv_volume_proto_t *)(bitmap_context->volume_context);

        uint32_t header_size = bitmap_context->header_size;//sizeof(struct lsv_bitmap_header) + BITMAP_CHUNK_HEADER_SIZE(lsv_vol->size);
        struct lsv_bitmap_header *pheader = BITMAP_HEADER(bitmap_context);
        uint32_t header_padding_size = header_size;

        if(header_padding_size % CHUNK_SIZE)
                header_padding_size += CHUNK_SIZE - (header_padding_size % CHUNK_SIZE);

        pheader->header_chunk_ids[0] = first_chunk_id;

        for(int i=0;i<header_padding_size;i+= CHUNK_SIZE)
        {
                uint32_t chunk_id = pheader->header_chunk_ids[i / CHUNK_SIZE];
                LSV_DBUG("load header, size=%d, chunk_id=%d\r\n", header_size, chunk_id);

                if(i + CHUNK_SIZE == header_padding_size)   //the last.
                        ret = lsv_bitmap_read_chunk(bitmap_context->volume_context, 0, chunk_id, 0, (header_size % CHUNK_SIZE) != 0? (header_size % CHUNK_SIZE):CHUNK_SIZE, ((char *)pheader) + i);
                else
                        ret = lsv_bitmap_read_chunk(bitmap_context->volume_context, 0, chunk_id, 0, CHUNK_SIZE, ((char *)pheader) + i);

                if(ret)
                {
                        //fatal error;
                        DFATAL("bitmap header load error, ret=%d\r\n", ret);
                        return ret;
                }
        }

        memset(pheader->reserved2, 0, sizeof(pheader->reserved2));

        return 0;
}

int private bitmap_header_load(struct lsv_bitmap_context *bitmap_root, uint32_t first_chunk_id)
{
        bitmap_root->bitmap_header->reserved2[1] = 0;
        bitmap_root->bitmap_header->reserved2[2] = 0;
#if LSV_BITMAP_ASYNC_LOAD
        int ret = bitmap_header_load_async(bitmap_root, first_chunk_id);
        LSV_DBUG("load bitmap header, count = %d\r\n", bitmap_root->bitmap_header->reserved2[1]);

        return ret;
#else
        int ret;
#endif
        int icount = 0;

        stack_context_t stack;

        stack_init(&stack);

        stack_push_arg(&stack, bitmap_root, (void *)(uint64_t)first_chunk_id);

        while(!stack_empty(&stack))
        {
                void *arg;
                struct lsv_bitmap_context *node = stack_pop_arg(&stack, &arg);
                struct lsv_bitmap_header *pheader = BITMAP_HEADER(node);

                if(node == bitmap_root)
                {
                        LSV_DBUG("Loading child nodes, root node, %d %d.\r\n", pheader->first_child_chunk_id, pheader->next_chunk_id);
                }
                else
                {
                        LSV_DBUG("Loading child nodes, %s, %d %d.\r\n", pheader->name, pheader->first_child_chunk_id, pheader->next_chunk_id);
                }

                ret = bitmap_header_load_internal(node, (uint32_t)(uint64_t)arg);
                if(ret)
                        return ret;

                icount ++;
                LSV_DBUG("load bitmap header, count = %d\r\n", icount);

                if(pheader->first_child_chunk_id)
                {
                        struct lsv_bitmap_context *bitmap_node;

                        ret = lsv_bitmap_create_node(bitmap_root->volume_context, &bitmap_node, LSV_BITMAP_FALSE, 0);
                        if(ret)
                                return ret;

                        //current node is parent.
                        bitmap_node->parent = node;
                        node->first_child = bitmap_node;

                        stack_push_arg(&stack, bitmap_node, (void *)(uint64_t)pheader->first_child_chunk_id);
                        //ret = bitmap_header_load(bitmap_node, pheader->first_child_chunk_id);
                       // if(ret)
                        //        return ret;
                }

                if(pheader->next_chunk_id)
                {
                        struct lsv_bitmap_context *bitmap_node;

                        LSV_DBUG("Loading brother node, thunkid=%d.", pheader->next_chunk_id);
                        ret = lsv_bitmap_create_node(bitmap_root->volume_context, &bitmap_node, LSV_BITMAP_FALSE, 0);
                        if(ret)
                                return ret;

                        //brother, same parent.
                        bitmap_node->parent = node->parent;
                        node->next = bitmap_node;

                        //ret = bitmap_header_load(bitmap_node, pheader->next_chunk_id);
                        //if(ret)
                         //       return ret;
                        stack_push_arg(&stack, bitmap_node, (void *)(uint64_t)pheader->next_chunk_id);

                        LSV_DBUG("Loading brother node ok.");
                }

                if(bitmap_root->bitmap_header->active_chunk_id == pheader->header_chunk_ids[0])
                        bitmap_root->active = node;
        }

        bitmap_root->is_loading = 0;

        return 0;
}


int bitmap_header_read(struct lsv_bitmap_context *bitmap_context, uint64_t off, uint32_t len, void * buffer)
{
        lsv_volume_proto_t *lsv_vol = (lsv_volume_proto_t *)(bitmap_context->volume_context);
        struct lsv_bitmap_header *pheader = bitmap_context->bitmap_header;

        uint32_t header_size = sizeof(struct lsv_bitmap_header) + BITMAP_CHUNK_HEADER_SIZE(lsv_vol->size);

        if(off >= header_size)
                return -1;

        if(off + len > header_size)
                len = header_size - (uint32_t)off;

        memcpy(buffer, (char *)pheader + off, len);

        return 0;
}

int lsv_bitmap_resize_node(struct lsv_bitmap_context *bitmap_context, uint64_t new_size, struct lsv_bitmap_context **new_context)
{
        lsv_volume_proto_t *lsv_vol = (lsv_volume_proto_t *)(bitmap_context->volume_context);
        struct lsv_bitmap_header *pheader = BITMAP_HEADER(bitmap_context);
        uint32_t header_size = sizeof(struct lsv_bitmap_header) + BITMAP_CHUNK_HEADER_SIZE(lsv_vol->size);
        uint32_t new_header_size = sizeof(struct lsv_bitmap_header) + BITMAP_CHUNK_HEADER_SIZE(new_size);
        uint32_t new_chunk_map_count = NUM_OF_BITMAP_CHUNK(new_size);
        uint32_t bitmap_size = sizeof(struct lsv_bitmap_context) + BITMAP_CHUNK_HEADER_SIZE(lsv_vol->size);
        uint32_t new_bitmap_size = sizeof(struct lsv_bitmap_context) + BITMAP_CHUNK_HEADER_SIZE(new_size);

        if(new_header_size < header_size)
                return -1;

        if(new_chunk_map_count == pheader->chunk_map_count)
                return 0;

        void *new_bitmap = xmalloc(new_bitmap_size);
        assert(new_bitmap);

        memcpy(new_bitmap, bitmap_context, bitmap_size);
        memset((uint8_t *)new_bitmap + bitmap_size, 0, new_bitmap_size - bitmap_size);

        xfree(bitmap_context);

        *new_context = new_bitmap;

        assert((*new_context)->volume_context == (void *)lsv_vol);

        lsv_bitmap_init_lock(*new_context);

        int ret = bitmap_header_save(new_bitmap, SAVE_HEADER_LEVEL_ALL);

        return ret;
}

int lsv_bitmap_release_cache(void *volume_context)
{
        struct lsv_bitmap_context *root = ((lsv_volume_proto_t *)volume_context)->bitmap_context;

#ifdef LSV_BITMAP_ENABLE_HASH_CACHE
        lsv_hash_cache_release(root->hash_cache);
#else
        assert(0);
#endif
        return 0;
}

int lsv_bitmap_resize(void *volume_context, uint64_t new_size)
{
        lsv_volume_proto_t *lsv_info = (lsv_volume_proto_t *)volume_context;
        struct lsv_bitmap_context *root = ((lsv_volume_proto_t *)volume_context)->bitmap_context;

        stack_context_t stack;

        stack_init(&stack);

        stack_push_arg(&stack, root, &((lsv_volume_proto_t *)volume_context)->bitmap_context);

        lsv_wrlock(&lsv_info->io_lock);

        while(!stack_empty(&stack))
        {
                struct lsv_bitmap_context **arg; //tripe pointer..
                struct lsv_bitmap_context *node = stack_pop_arg(&stack, (void *)&arg);

                int ret = lsv_bitmap_resize_node(node, new_size, arg);
                if(ret)
                {
                        lsv_unlock(&lsv_info->io_lock);

                        return ret;
                }

                node = *arg;

                if(node->next)
                        stack_push_arg(&stack, node->next, &node->next);

                if(node->first_child)
                {
                        node->first_child->parent = node;
                        stack_push_arg(&stack, node->first_child, &node->first_child);
                }
        }

        lsv_unlock(&lsv_info->io_lock);
        return 0;
}

int private lsv_read_bitmap_node(struct lsv_bitmap_context *bitmap_context, uint32_t vvol_id, uint64_t chunk_id, uint32_t chunk_off, uint32_t len, void *bitmap_buf)
{
        uint64_t off_new = chunk_off;
        uint32_t len_new = len;

        return lsv_hash_cache_get(bitmap_context->hash_cache, chunk_id, chunk_off, len, bitmap_buf);
        
        assert(chunk_off + len <= LSV_CHUNK_SIZE);
        if(chunk_off % LSV_PAGE_SIZE)
        {
                off_new = chunk_off - (chunk_off % LSV_PAGE_SIZE);
                len_new += chunk_off - off_new;
        }

        if(len_new % LSV_PAGE_SIZE != 0)
                len_new = len_new + (LSV_PAGE_SIZE - (len_new % LSV_PAGE_SIZE));
#ifdef LSV_BITMAP_ENABLE_HASH_CACHE
        #ifdef CACHE_NEW
                return cache_api->get(bitmap_context->hash_cache, chunk_id, chunk_off, len, allocate,  bitmap_buf);
        #else
                return lsv_hash_cache_get(bitmap_context->hash_cache, chunk_id, chunk_off, len, bitmap_buf);
        #endif
#else
        lsv_bitmap_cache_unit_t *cache_ref;
        uint8_t *cache_buf = lsv_bitmap_cache_get(bitmap_context, vvol_id, chunk_id, off_new, len_new, 0, &cache_ref);
        //assert(cache_buf);
        if(!cache_buf) {
                DFATAL("lsv bitmap cache get failed, ret=%d\r\n", errno);
                return errno;
        }

        assert(cache_ref->chunk_id == chunk_id);

        memcpy(bitmap_buf, cache_buf + chunk_off, len);

        lsv_bitmap_cache_page_unlock(cache_ref, off_new / PAGE_SIZE, len_new / PAGE_SIZE);
        lsv_bitmap_cache_unlock(cache_ref);
#endif
        return 0;
}

int private lsv_release_bitmap_node(struct lsv_bitmap_context *bitmap_context, uint32_t vvol_id, uint64_t chunk_id, uint32_t chunk_off, uint32_t len)
{
        (void) vvol_id;
        YASSERT(chunk_off + len <= LSV_CHUNK_SIZE);
        return -1;//cache_api->release(bitmap_context->hash_cache, chunk_id, chunk_off, len);
}


int private lsv_read_bitmap(void *volume_context, uint32_t vvol_id, uint64_t chunk_id, uint32_t chunk_off, uint32_t len, void *bitmap_buf)
{
        struct lsv_bitmap_context * node = lsv_bitmap_volume_to_node(volume_context);

        return lsv_read_bitmap_node(node, vvol_id, chunk_id, chunk_off, len, bitmap_buf);
}

int private lsv_write_bitmap_node(void *volume_context, struct lsv_bitmap_context *bitmap_context,
                                  uint32_t chunk_id, uint32_t chunk_off, uint32_t len, void *bitmap_buf, int flags)
{
#ifdef LSV_BITMAP_ENABLE_HASH_CACHE
    DBUG("write bitmap at: chunk_id=%d, chunk_off: %d\r\n", chunk_id, chunk_off);
    return lsv_hash_cache_update(bitmap_context->hash_cache, chunk_id, chunk_off, len, bitmap_buf);
#endif

        uint32_t off_new = chunk_off;
        uint32_t len_new = len;
        int ret;
        lsv_volume_proto_t *lsv_info = volume_context;

        assert(chunk_off + len <= LSV_CHUNK_SIZE);

        if(chunk_off % LSV_PAGE_SIZE)
        {
                off_new = chunk_off - (chunk_off % LSV_PAGE_SIZE);
                len_new += chunk_off - off_new;
        }

        if(len_new % LSV_PAGE_SIZE != 0)
                len_new = len_new + (LSV_PAGE_SIZE - (len_new % LSV_PAGE_SIZE));

        lsv_bitmap_cache_unit_t *cache_ref = NULL;
        uint8_t *cache_buf = lsv_bitmap_cache_get(bitmap_context, 0, chunk_id, off_new, len_new, 0, &cache_ref);
        //assert(cache_buf);
        if(!cache_buf) {
                DFATAL("lsv bitmap cache get failed, ret=%d\r\n", errno);
                return errno;
        }

        assert(cache_ref->chunk_id == chunk_id);

        #if 0
        if(!(flags & LSV_FEATURE_BITMAP_GC_VALUE_UP))
        {//for gc
                lsv_bitmap_unit_t *pbitmap = (lsv_bitmap_unit_t *)(cache_buf + chunk_off);
                for(int i=0;i<len;i+=sizeof(lsv_bitmap_unit_t))
                {
                        //TODO 是否表示之前已经有数据，现在是update操作，故需要将gc的回收价值加1，同时回收rcache对应的page
                        if(pbitmap->chunk_id){
                                lsv_rcache_page_clean(bitmap_context->volume_context, pbitmap->chunk_id, LSV_PAGE_SIZE * pbitmap->chunk_page_off);
                                lsv_gc_valueup(bitmap_context->volume_context, pbitmap->chunk_id, pbitmap->chunk_page_off);
                        }
                        pbitmap ++;
                }
        }
        #endif

        memcpy(cache_buf + chunk_off, bitmap_buf, len);

        //assert(((lsv_bitmap_unit_t *)bitmap_buf)->chunk_id < 1024 * 1024);

        if(bitmap_context->is_session)
        {
                lsv_bitmap_cache_mark_dirty(cache_ref, off_new / PAGE_SIZE, len_new / PAGE_SIZE);

                ret = 0;
        }
        else
        {
                //important, may dirty.
                if(lsv_is_bitmap_cache_dirty(cache_ref))
                {
                        //lsv_bitmap_cache_mark_dirty(cache_ref, off_new / PAGE_SIZE, len_new / PAGE_SIZE);       //may not us, do it again.
                        //lsv_bitmap_commit_dirty_node(bitmap_context);

                        lsv_info->row2_stat.cache_swapout++;

                        ret = lsv_bitmap_write_chunk(bitmap_context->volume_context, chunk_id, 0, CHUNK_SIZE, cache_buf); //alocate new chunk.
                        if(!ret)
                                lsv_bitmap_cache_clear_dirty(cache_ref, 0, CHUNK_SIZE / LSV_PAGE_SIZE);
                        else
                                DFATAL("lsv_bitmap_write_chunk failed %d. \r\n", ret);
                }
                else
                {
                        lsv_info->row2_stat.meta_write++;

                        //really important, prevent being yield and paged out.
                        //lsv_bitmap_cache_mark_dirty(cache_ref, off_new / LSV_PAGE_SIZE, len_new / LSV_PAGE_SIZE);
                        ret = lsv_bitmap_write_chunk(bitmap_context->volume_context, chunk_id, off_new, len_new, cache_buf + off_new);
                        if(!ret)
                                ;//        lsv_bitmap_cache_clear_dirty(cache_ref, off_new / LSV_PAGE_SIZE, len_new / LSV_PAGE_SIZE);
                        else
                                DFATAL("lsv_bitmap_write_chunk failed %d. \r\n", ret);
                }
        }

        lsv_bitmap_cache_page_unlock(cache_ref, off_new / PAGE_SIZE, len_new / PAGE_SIZE);
        lsv_bitmap_cache_unlock(cache_ref);

        if(ret)
        {
                //fatal error.
                return ret;
        }

        return ret;
}

int private lsv_pre_write_bitmap_node(struct lsv_bitmap_context *bitmap_context, uint32_t chunk_id, uint32_t chunk_off, uint32_t len)
{
        uint32_t off_new = chunk_off;
        uint32_t len_new = len;

        assert(chunk_off + len <= LSV_CHUNK_SIZE);

        if(chunk_off % LSV_PAGE_SIZE)
        {
                off_new = chunk_off - (chunk_off % LSV_PAGE_SIZE);
                len_new += chunk_off - off_new;
        }

        if(len_new % LSV_PAGE_SIZE != 0)
                len_new = len_new + (LSV_PAGE_SIZE - (len_new % LSV_PAGE_SIZE));

        lsv_bitmap_cache_pre_load(bitmap_context, 0, chunk_id, off_new, len_new);

        return 0;
}

int private lsv_write_bitmap(void *volume_context, uint32_t chunk_id, uint32_t chunk_off, uint32_t len, void *bitmap_buf)
{
        struct lsv_bitmap_context * node = lsv_bitmap_volume_to_node(volume_context);

        return lsv_write_bitmap_node(volume_context, node, chunk_id, chunk_off, len, bitmap_buf, 0);
}

//read one entry
int private lsv_bitmap_paged_read_node(struct lsv_bitmap_context *bitmap_context, uint64_t off, struct lsv_bitmap_unit *bitmap_buf)
{
        uint64_t chunk_index;
        uint32_t chunk_off;
        uint32_t chunk_id;
        uint32_t vvol_id;

        int     ret = 0;
        struct lsv_bitmap_header *pheader = BITMAP_HEADER(bitmap_context);

        chunk_index = OFFSET_TO_INDEX(off);
        chunk_off = OFFSET_TO_RNDEX(off) *sizeof(struct lsv_bitmap_unit);
        chunk_id = pheader->chunk_map[chunk_index].chunk_id;
        vvol_id = pheader->chunk_map[chunk_index].vvol_id;

        LSV_DBUG("bitmap_read>off:%llu,chunk_id:%u,chunk_off:%u\n", (LLU)off,chunk_id,chunk_off);
        if(chunk_id == 0) //no bitmap allocated.
                memset(bitmap_buf, 0, sizeof(struct lsv_bitmap_unit));
        else
                ret = lsv_read_bitmap_node(bitmap_context, vvol_id, chunk_id, chunk_off, sizeof(struct lsv_bitmap_unit), bitmap_buf);

        if(ret)
        {
                //fatal error.
                return ret;
        }

        return ret;
}

int private lsv_bitmap_paged_read_node_directly(struct lsv_bitmap_context *bitmap_context, uint64_t off, struct lsv_bitmap_unit *bitmap_buf)
{
        uint64_t chunk_index;
        uint32_t chunk_off;
        uint32_t chunk_id;
        uint32_t vvol_id;

        int     ret = 0;
        struct lsv_bitmap_header *pheader = BITMAP_HEADER(bitmap_context);

        chunk_index = OFFSET_TO_INDEX(off);
        chunk_off = OFFSET_TO_RNDEX(off) *sizeof(struct lsv_bitmap_unit);
        chunk_id = pheader->chunk_map[chunk_index].chunk_id;
        vvol_id = pheader->chunk_map[chunk_index].vvol_id;

        uint64_t off_new = chunk_off;
        uint32_t len_new = sizeof(struct lsv_bitmap_unit);

        if(chunk_off % LSV_PAGE_SIZE)
        {
                off_new = chunk_off - (chunk_off % LSV_PAGE_SIZE);
                len_new += chunk_off - off_new;
        }

        if(len_new % LSV_PAGE_SIZE != 0)
                len_new = len_new + (LSV_PAGE_SIZE - (len_new % LSV_PAGE_SIZE));

        LSV_DBUG("bitmap_read>off:%llu,chunk_id:%u,chunk_off:%u\n", (LLU)off,chunk_id,chunk_off);
        if(chunk_id == 0) //no bitmap allocated.
                memset(bitmap_buf, 0, sizeof(struct lsv_bitmap_unit));
        else
        {
                uint8_t *page_buf = xmalloc(PAGE_SIZE);
                ret = lsv_bitmap_read_chunk(bitmap_context->volume_context, vvol_id, chunk_id, off_new, len_new, page_buf);
                if(!ret)
                        memcpy(bitmap_buf, page_buf + (chunk_off - off_new), sizeof(struct lsv_bitmap_unit));

                xfree(page_buf);
        }

        if(ret)
        {
                //fatal error.
                return ret;
        }

        return ret;
}

int private lsv_bitmap_paged_read(void *volume_context, uint64_t off, struct lsv_bitmap_unit *bitmap_buf)
{
        int ret;
        struct lsv_bitmap_context * node = lsv_bitmap_volume_to_node(volume_context);

        lsv_bitmap_rlock(volume_context);

        ret = lsv_bitmap_paged_read_node(node, off, bitmap_buf);

        lsv_bitmap_unlock(volume_context);

        return ret;
}

//set one entry.
int private lsv_bitmap_paged_write_node(void *volume_context, struct lsv_bitmap_context *bitmap_context,
                                        uint64_t off, struct lsv_bitmap_unit *bitmap_buf, int flags)
{
        uint64_t chunk_index;
        uint32_t chunk_off;
        uint32_t chunk_id;
        int     ret = 0;
        struct lsv_bitmap_header *pheader = BITMAP_HEADER(bitmap_context);
        int     header_changed = 0;

        chunk_index = OFFSET_TO_INDEX(off);
        chunk_off = OFFSET_TO_RNDEX(off) * sizeof(struct lsv_bitmap_unit);
        chunk_id = pheader->chunk_map[chunk_index].chunk_id;
        uint8_t *cache_buf;

        #if LSV_BITMAP_CHECK_BITMAP_LBA
        bitmap_buf->lba = off;
        #endif

        if(chunk_id == 0) //no bitmap allocated.
        {
                lsv_bitmap_cache_unit_t *ref = NULL;
                ret = lsv_bitmap_alloc_chunk(bitmap_context->volume_context, &chunk_id, 1);
                if(ret)
                {
                        //fatal error.
                        return ret;
                }

                cache_buf = lsv_bitmap_cache_get(bitmap_context, pheader->chunk_map[chunk_index].vvol_id, 0, 0, 0, 1, &ref);
                if(!cache_buf)
                        return errno;

                pheader->chunk_map[chunk_index].is_ref = 0;
                pheader->chunk_map[chunk_index].chunk_id = chunk_id;
                pheader->chunk_map[chunk_index].vvol_id = lsv_bitmap_get_current_volume(bitmap_context)->bitmap_header->vvol_id;

                header_changed = 1;

                ref->chunk_id = chunk_id;
                lsv_bitmap_cache_mark_dirty(ref, 0, BITMAP_CHUNK_SIZE / PAGE_SIZE);

                LSV_DBUG("lsv bitmap cache set chunk_id=%u\r\n",chunk_id);
        }
        else if(pheader->chunk_map[chunk_index].is_ref) //snapshot.
        {
                lsv_wrlock(&bitmap_context->cow_lock);
                if(! pheader->chunk_map[chunk_index].is_ref)
                        goto out;

                lsv_bitmap_cache_unit_t *ref, *new_ref;
                uint32_t new_chunk_id;
                uint8_t * new_cache_buf;
                /* be carefull to modify this part of code, as I/O function calling maybe yield out of the entry,
                 * and proboly cause memory inconcistentcy after return back,
                 * I/O function must be considered as an aysnc operation.*/
                ret = lsv_bitmap_alloc_chunk(bitmap_context->volume_context, &new_chunk_id, 0);   //chunk is replaced, important, may yeild.
                if(ret)
                {
                        lsv_unlock(&bitmap_context->cow_lock);
                        //fatal error.
                        return ret;
                }

                //BITMAP_CHCHE_ID(bitmap_context) = chunk_id; //id changed, now cheat to lower level write to the new chunk...

                cache_buf = lsv_bitmap_cache_get(bitmap_context, pheader->chunk_map[chunk_index].vvol_id, chunk_id, 0, 0, 1, &ref);
                //assert(cache_buf);
                if(!cache_buf)
                {
                        lsv_unlock(&bitmap_context->cow_lock);
                        return errno;
                }
                assert(ref->chunk_id == chunk_id);

                new_cache_buf = lsv_bitmap_cache_get(bitmap_context, pheader->chunk_map[chunk_index].vvol_id, 0, 0, 0, 1, &new_ref);
                //assert(new_cache_buf);
                if(!new_cache_buf)
                {
                        lsv_unlock(&bitmap_context->cow_lock);
                        return errno;
                }

                if(pheader->chunk_map[chunk_index].is_ref) //others already did.
                {
                        memcpy(new_cache_buf, cache_buf, CHUNK_SIZE);

                        new_ref->chunk_id = new_chunk_id; //id changed, no need write here, just cheat to lower level write to the new chunk...

                        pheader->chunk_map[chunk_index].is_ref = 0;
                        pheader->chunk_map[chunk_index].chunk_id = chunk_id = new_chunk_id;
                        pheader->chunk_map[chunk_index].vvol_id = lsv_bitmap_get_current_volume(bitmap_context)->bitmap_header->vvol_id;

                        lsv_bitmap_cache_mark_dirty(new_ref, 0, BITMAP_CHUNK_SIZE / PAGE_SIZE);
                        header_changed = 1;
                }
                else
                        LSV_DBUG("lsv cow yeild happen.\r\n");

                lsv_unlock(&bitmap_context->cow_lock);
        }

        out:

        //if not set then set current volume id.
        if(bitmap_buf->vvol_id == 0)
                bitmap_buf->vvol_id = bitmap_context->bitmap_header->vvol_id;

        ret = lsv_write_bitmap_node(volume_context, bitmap_context, chunk_id, chunk_off, sizeof(struct lsv_bitmap_unit), bitmap_buf, flags);

        if(ret)
        {
                //fatal error.
                return ret;
        }

        if(header_changed)
        {
                return bitmap_header_save(bitmap_context, SAVE_HEADER_LEVEL_ALL);
        }
        else
                return ret;
}

int private lsv_bitmap_paged_write(void *volume_context, uint64_t off, struct lsv_bitmap_unit *bitmap_buf)
{
        int ret;
        struct lsv_bitmap_context *root = ((lsv_volume_proto_t *)volume_context)->bitmap_context;
        struct lsv_bitmap_context * node = lsv_bitmap_volume_to_node(volume_context);

        lsv_bitmap_rlock(volume_context);

        ret = lsv_bitmap_paged_write_node(volume_context, node, off, bitmap_buf, 0);

        lsv_bitmap_unlock(volume_context);

        if(! (root->flags & LSV_BITMAP_FLAG_DISABLE_GC))
                lsv_gc_valueup_commit(volume_context);

        return ret;
}

int private lsv_bitmap_paged_pre_write_node(struct lsv_bitmap_context *bitmap_context, uint64_t off)
{
        uint64_t chunk_index;
        uint32_t chunk_off;
        uint32_t chunk_id;
        int     ret = 0;
        struct lsv_bitmap_header *pheader = BITMAP_HEADER(bitmap_context);

        chunk_index = OFFSET_TO_INDEX(off);
        chunk_off = OFFSET_TO_RNDEX(off) * sizeof(struct lsv_bitmap_unit);
        chunk_id = pheader->chunk_map[chunk_index].chunk_id;

        ret = lsv_pre_write_bitmap_node(bitmap_context, chunk_id, chunk_off, sizeof(struct lsv_bitmap_unit));

        return ret;
}

int private lsv_bitmap_paged_pre_write(void *volume_context, uint64_t off)
{
        int ret;
        struct lsv_bitmap_context * node = lsv_bitmap_volume_to_node(volume_context);

        lsv_bitmap_rlock(volume_context);

        ret = lsv_bitmap_paged_pre_write_node(node, off);

        lsv_bitmap_unlock(volume_context);

        return ret;
}

int private lsv_bitmap_paged_delete(void *volume_context, uint64_t off)
{
        struct lsv_bitmap_unit bitmap = {0};

        lsv_bitmap_rlock(volume_context);
        int ret = lsv_bitmap_paged_write(volume_context, off, &bitmap);
        lsv_bitmap_unlock(volume_context);

        return ret;
}

int private lsv_bitmap_batch_read_node(struct lsv_bitmap_context *bitmap_context, uint64_t off, uint32_t len, void *bitmap_buf)
{
        uint64_t prev_chunk_index = -1;
        uint64_t prev_off = off;
        uint32_t prev_len = 0;
        uint64_t chunk_off;
        uint64_t chunk_id;
        uint32_t vvol_id;
        struct lsv_bitmap_context *root = ((lsv_volume_proto_t *)bitmap_context->volume_context)->bitmap_context;
        int     ret = 0;
        struct lsv_bitmap_header *pheader = BITMAP_HEADER(bitmap_context);

        (void)len;

        for(int i=0;i<len / LSV_PAGE_SIZE;i++)
        {
                uint64_t chunk_index = OFFSET_TO_INDEX(off + i * LSV_PAGE_SIZE);

                if(unlikely(prev_chunk_index == -1))
                {
                        prev_chunk_index = chunk_index;
                        prev_off = off + i * LSV_PAGE_SIZE;
                        prev_len = LSV_PAGE_SIZE;
                }
                else if(chunk_index != prev_chunk_index || prev_off + prev_len != (off + i * LSV_PAGE_SIZE))
                {
                        chunk_off = OFFSET_TO_RNDEX(prev_off) * sizeof(struct lsv_bitmap_unit);
                        chunk_id = pheader->chunk_map[prev_chunk_index].chunk_id;
                        vvol_id = pheader->chunk_map[prev_chunk_index].vvol_id;

                        //Proceed the previous record.
                        if(chunk_id == 0) //no bitmap allocated.
                                memset(bitmap_buf + DATAOFF_TO_BITMAPOFF(prev_off - off), 0, DATAOFF_TO_BITMAPOFF(prev_len));
                        else {
                                LSV_DBUG("metadata lba:%llu,len:%d->chunk_id:%ju,chunk_off:%ju\n", (LLU)off,len, chunk_id,chunk_off);
                                ret = lsv_read_bitmap_node(bitmap_context, vvol_id, chunk_id, (uint32_t)chunk_off,
                                                           DATAOFF_TO_BITMAPOFF(prev_len), bitmap_buf + DATAOFF_TO_BITMAPOFF(prev_off - off));

                                if(!ret && pheader->chunk_map[prev_chunk_index].is_ref && root->read_snap_cb) {
                                        root->read_snap_cb(0, (lsv_bitmap_unit_t *)((uint8_t *)bitmap_buf + DATAOFF_TO_BITMAPOFF(prev_off - off)), 
                                                        DATAOFF_TO_BITMAPOFF(prev_len) / sizeof(lsv_bitmap_unit_t));
                                }

                                #if LSV_BITMAP_CHECK_BITMAP_LBA
                                struct lsv_bitmap_unit *bitmaps = (struct lsv_bitmap_unit *)bitmap_buf + DATAOFF_TO_BITMAPOFF(prev_off - off);
                                for(int i=0;i<DATAOFF_TO_BITMAPOFF(prev_len) / sizeof(struct lsv_bitmap_unit);i++) {
                                        assert(bitmaps->lba == off + i * LSV_PAGE_SIZE);
                                        bitmaps ++;
                                }
                                #endif
                        }

                        if(ret)
                        {
                                //fatal error.
                                return ret;
                        }

                        prev_chunk_index = chunk_index;

                        prev_off = off + i * LSV_PAGE_SIZE;
                        prev_len = LSV_PAGE_SIZE;
                }
                else
                        prev_len += LSV_PAGE_SIZE;
        }

        if(prev_chunk_index != -1)
        {
                chunk_off = OFFSET_TO_RNDEX(prev_off) * sizeof(struct lsv_bitmap_unit);
                chunk_id = pheader->chunk_map[prev_chunk_index].chunk_id;
                vvol_id = pheader->chunk_map[prev_chunk_index].vvol_id;

                if(chunk_id == 0) //no bitmap allocated.
                        memset(bitmap_buf + DATAOFF_TO_BITMAPOFF(prev_off - off), 0,DATAOFF_TO_BITMAPOFF(prev_len));
                else {
                        LSV_DBUG("metadata lba:%llu,len:%d->chunk_id:%ju,chunk_off:%ju\n", (LLU)off,len, chunk_id,chunk_off);
                        ret = lsv_read_bitmap_node(bitmap_context, vvol_id, chunk_id, (uint32_t)chunk_off,
                                                   DATAOFF_TO_BITMAPOFF(prev_len), bitmap_buf + DATAOFF_TO_BITMAPOFF(prev_off - off));

                        if (!ret && pheader->chunk_map[prev_chunk_index].is_ref && root->read_snap_cb) {
                                root->read_snap_cb(0, (lsv_bitmap_unit_t *)((uint8_t *)bitmap_buf + DATAOFF_TO_BITMAPOFF(prev_off - off)), 
                                                        DATAOFF_TO_BITMAPOFF(prev_len) / sizeof(lsv_bitmap_unit_t));

                                #if LSV_BITMAP_CHECK_BITMAP_LBA
                                struct lsv_bitmap_unit *bitmaps = (struct lsv_bitmap_unit *)bitmap_buf + DATAOFF_TO_BITMAPOFF(prev_off - off);
                                for(int i=0;i<DATAOFF_TO_BITMAPOFF(prev_len) / sizeof(struct lsv_bitmap_unit);i++) {
                                        assert(bitmaps->lba == off + i * LSV_PAGE_SIZE);
                                        bitmaps ++;
                                }
                                #endif

                        }
                }

        }

        return ret;
}

// TODO. need release by bitmap_buf
int private lsv_bitmap_batch_release_node(struct lsv_bitmap_context *bitmap_context, uint64_t off, uint32_t len, void *bitmap_buf)
{
        int ret = 0;
        uint64_t prev_chunk_index = -1;
        uint64_t prev_off = off;
        uint32_t prev_len = 0;
        uint64_t chunk_off;
        uint64_t chunk_id;
        uint32_t vvol_id;

        (void) bitmap_buf;

        struct lsv_bitmap_header *pheader = BITMAP_HEADER(bitmap_context);

        assert(0);
        for (int i = 0; i < len / LSV_PAGE_SIZE; i++)
        {
                uint64_t chunk_index = OFFSET_TO_INDEX(off + i * LSV_PAGE_SIZE);

                if (prev_chunk_index == -1) {
                        prev_chunk_index = chunk_index;
                        prev_off = off + i * LSV_PAGE_SIZE;
                        prev_len = LSV_PAGE_SIZE;
                } else if (chunk_index != prev_chunk_index || prev_off + prev_len != (off + i * LSV_PAGE_SIZE)) {
                        chunk_off = OFFSET_TO_RNDEX(prev_off) * sizeof(struct lsv_bitmap_unit);
                        chunk_id = pheader->chunk_map[prev_chunk_index].chunk_id;
                        vvol_id = pheader->chunk_map[prev_chunk_index].vvol_id;

                        //Proceed the previous record.
                        if (likely(chunk_id)) { //no bitmap allocated.
                                ret = lsv_release_bitmap_node(bitmap_context, vvol_id, chunk_id, (uint32_t)chunk_off, DATAOFF_TO_BITMAPOFF(prev_len));
                                if (unlikely(ret)) {
                                        GOTO(err_ret, ret);
                                }
                        }

                        prev_chunk_index = chunk_index;

                        prev_off = off + i * LSV_PAGE_SIZE;
                        prev_len = LSV_PAGE_SIZE;
                } else {
                        prev_len += LSV_PAGE_SIZE;
                }
        }

        if (prev_chunk_index != -1) {
                chunk_off = OFFSET_TO_RNDEX(prev_off) * sizeof(struct lsv_bitmap_unit);
                chunk_id = pheader->chunk_map[prev_chunk_index].chunk_id;
                vvol_id = pheader->chunk_map[prev_chunk_index].vvol_id;

                if (likely(chunk_id)) {
                        ret = lsv_release_bitmap_node(bitmap_context, vvol_id, chunk_id, (uint32_t)chunk_off, DATAOFF_TO_BITMAPOFF(prev_len));
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                }

        }

        return 0;
err_ret:
        return ret;
}

int private lsv_bitmap_batch_read(void *volume_context, uint64_t off, uint32_t len, void *bitmap_buf)
{
        struct lsv_bitmap_context * node = lsv_bitmap_volume_to_node(volume_context);

        lsv_bitmap_rlock(volume_context);

        int ret = lsv_bitmap_batch_read_node(node, off, len, bitmap_buf);
        if (unlikely(ret))
                GOTO(err_unlock, ret);


#if 0
        struct lsv_bitmap_unit *pbitmap_buf = (struct lsv_bitmap_unit *)bitmap_buf;
        for (int i = 0; i < len; i += LSV_PAGE_SIZE) {
                LSV_DBUG("%d. volume logic off:%llu,len:%d, vvol_id:%u, chunk_id:%u,chunk_page_off:%u\n", i, (LLU)off + i,
                      LSV_PAGE_SIZE, pbitmap_buf->vvol_id, pbitmap_buf->chunk_id, pbitmap_buf->chunk_page_off);
                pbitmap_buf ++;
        }
#endif

        lsv_bitmap_unlock(volume_context);

        return 0;
err_unlock:
        lsv_bitmap_unlock(volume_context);
        return ret;
}

int private lsv_bitmap_batch_release(void *volume_context, uint64_t off, uint32_t len, void *bitmap_buf)
{
        int ret;

        struct lsv_bitmap_context * node = lsv_bitmap_volume_to_node(volume_context);

        ret = lsv_bitmap_batch_release_node(node, off, len, bitmap_buf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        YASSERT(0 && "!!!");
        return ret;
}

static inline int lsv_bitmap_batch_write_node(void *volume_context, struct lsv_bitmap_context *bitmap_context,
                                        uint64_t off, uint32_t len, void *bitmap_buf)
{
        uint64_t prev_chunk_index = -1;
        uint64_t prev_off = off;
        uint32_t prev_len = 0;
        uint32_t chunk_off;
        uint32_t chunk_id;
        int     ret = 0;
        struct lsv_bitmap_header *pheader = BITMAP_HEADER(bitmap_context);
        struct lsv_bitmap_context *root = ((lsv_volume_proto_t *)volume_context)->bitmap_context;
        int     header_changed = 0;

        //if not set then set current volume id.

        struct lsv_bitmap_unit *bitmaps = (struct lsv_bitmap_unit *)bitmap_buf;
        for(int i=0;i<len;i+=LSV_PAGE_SIZE)
        {
                if(bitmaps->vvol_id == 0)
                        bitmaps->vvol_id = lsv_bitmap_get_current_volume(bitmap_context)->bitmap_header->vvol_id;

                LSV_DBUG("%d. data off:%llu,len:%d, vvol_id:%d, chunk_id:%u,chunk_page_off:%u\n", i, (LLU)off, len, bitmaps->vvol_id, bitmaps->chunk_id, bitmaps->chunk_page_off);

                #if LSV_BITMAP_CHECK_BITMAP_LBA
                        bitmaps->lba = off + i;
                #endif

                bitmaps ++;
        }

        (void)len;

        for(int i=0;i<len / LSV_PAGE_SIZE;i++)
        {
                uint64_t chunk_index = OFFSET_TO_INDEX(off + i * LSV_PAGE_SIZE);

                if(prev_chunk_index == -1)
                {
                        prev_chunk_index = chunk_index;
                        prev_off = off + i * LSV_PAGE_SIZE;
                        prev_len = LSV_PAGE_SIZE;
                }
                else if(chunk_index != prev_chunk_index || prev_off + prev_len != (off + i * LSV_PAGE_SIZE))
                {
                        chunk_off = OFFSET_TO_RNDEX(prev_off) * sizeof(struct lsv_bitmap_unit);
                        chunk_id = pheader->chunk_map[prev_chunk_index].chunk_id;
                        uint8_t *cache_buf;

                        //Proceed the previous record.
                        if(unlikely(chunk_id == 0)) //no bitmap allocated.
                        {
                                lsv_bitmap_cache_unit_t *ref;

                                assert(0);
                                ret = lsv_bitmap_alloc_chunk(bitmap_context->volume_context, &chunk_id, 1);
                                if(ret)
                                {
                                        //fatal error.
                                        return ret;
                                }

                                cache_buf = lsv_bitmap_cache_get(bitmap_context, 0, 0, 0, 0, 1, &ref);
                                if(!cache_buf)
                                        return errno;
                                //zero buf, will ready to operate the chunk.
                                //memset(BITMAP_CACHE(bitmap_context), 0, BITMAP_CACHE_SIZE);
                                //BITMAP_CACHE_ID(bitmap_context) = chunk_id;

                                pheader->chunk_map[prev_chunk_index].is_ref = 0;
                                pheader->chunk_map[prev_chunk_index].chunk_id = chunk_id;
                                pheader->chunk_map[prev_chunk_index].vvol_id = lsv_bitmap_get_current_volume(bitmap_context)->bitmap_header->vvol_id;

                                header_changed = 1;

                                ref->chunk_id = chunk_id;

                                lsv_bitmap_cache_mark_dirty(ref, 0, BITMAP_CHUNK_SIZE / PAGE_SIZE);
                        }
                        else if(unlikely(pheader->chunk_map[prev_chunk_index].is_ref))
                        {
                                lsv_bitmap_cache_unit_t *ref;
                                uint32_t new_chunk_id = 0;

                                assert(0);
                                ret = lsv_bitmap_alloc_chunk(bitmap_context->volume_context, &new_chunk_id, 0);
                                if(ret)
                                {
                                        //fatal error.
                                        return ret;
                                }

                                cache_buf = lsv_bitmap_cache_get(bitmap_context, pheader->chunk_map[prev_chunk_index].vvol_id, chunk_id, 0,0,1,&ref);
                                if(!cache_buf) {
                                        DFATAL("lsv bitmap cache get failed, ret=%d\r\n", errno);
                                        return errno;
                                }

                                if(pheader->chunk_map[prev_chunk_index].vvol_id && pheader->chunk_map[prev_chunk_index].vvol_id != lsv_bitmap_get_current_volume(bitmap_context)->bitmap_header->vvol_id)
                                {
                                        uint64_t vol_id = 0;
                                        lsv_bitmap_vvol_to_vol(bitmap_context->volume_context, pheader->chunk_map[prev_chunk_index].vvol_id, &vol_id);

                                        LSV_DBUG("volume_proto_remote_read_chunk vold_id=%lld\r\n", (LLU)vol_id);

                                        ret = volume_proto_remote_read_chunk(bitmap_context->volume_context, vol_id, chunk_id, 0, CHUNK_SIZE, cache_buf);

                                        if(ret) {
                                                DFATAL("volume_proto_remote_read_chunk error %d\r\n", ret);
                                                //fatal error.
                                                lsv_bitmap_free_chunk(bitmap_context->volume_context, chunk_id);

                                                return ret;
                                        }

                                        LSV_DBUG("volume_proto_remote_read_chunk good\r\n");
                                }

                                assert(cache_buf);

                                LSV_DBUG("bitmap cow chunk_id %u to new_chunk_id %u \r\n", chunk_id, new_chunk_id);
                                //BITMAP_CHCHE_ID(bitmap_context) = chunk_id; //id changed, now cheat to lower level write to the new chunk...
                                ref->chunk_id = new_chunk_id; //id changed, no need write here, just cheat to lower level write to the new chunk...

                                pheader->chunk_map[prev_chunk_index].is_ref = 0;
                                pheader->chunk_map[prev_chunk_index].chunk_id = chunk_id = new_chunk_id;
                                pheader->chunk_map[prev_chunk_index].vvol_id = lsv_bitmap_get_current_volume(bitmap_context)->bitmap_header->vvol_id;

                                lsv_bitmap_cache_mark_dirty(ref, 0, BITMAP_CHUNK_SIZE / PAGE_SIZE);

                                header_changed = 1;

                                if(root->data_cow_callback)
                                        root->data_cow_callback(cache_buf);
                        }

                        //anyway, one time write...
                        LSV_DBUG("metadata lba:%llu,len:%d->chunk_id:%u,chunk_off:%u\n", (LLU)off,len, chunk_id,chunk_off);
                        ret = lsv_write_bitmap_node(volume_context, bitmap_context, chunk_id, chunk_off,
                                                    DATAOFF_TO_BITMAPOFF(prev_len), (char *)bitmap_buf + DATAOFF_TO_BITMAPOFF(prev_off - off), 0);
                        if(unlikely(ret))
                        {
                                //fatal error.
                                return ret;
                        }

                        prev_chunk_index = chunk_index;

                        prev_off = off + i * LSV_PAGE_SIZE;
                        prev_len = LSV_PAGE_SIZE;
                }
                else
                        prev_len += LSV_PAGE_SIZE;
        }

        if(likely(prev_chunk_index != -1))
        {
                chunk_off = OFFSET_TO_RNDEX(prev_off) * sizeof(struct lsv_bitmap_unit);
                chunk_id = pheader->chunk_map[prev_chunk_index].chunk_id;

                if(unlikely(chunk_id == 0)) //no bitmap allocated.
                {
                        assert(0);
                        lsv_bitmap_cache_unit_t *ref;
                        ret = lsv_bitmap_alloc_chunk(bitmap_context->volume_context, &chunk_id, 1);
                        if(ret)
                        {
                                //fatal error.
                                return ret;
                        }

                        uint8_t *cache_buf = lsv_bitmap_cache_get(bitmap_context, 0, 0, 0,0,1,&ref);
                        if(!cache_buf) {
                                DFATAL("lsv bitmap cache get failed, ret=%d\r\n", errno);
                                return errno;
                        }

                        pheader->chunk_map[prev_chunk_index].is_ref = 0;
                        pheader->chunk_map[prev_chunk_index].chunk_id = chunk_id;
                        pheader->chunk_map[prev_chunk_index].vvol_id = lsv_bitmap_get_current_volume(bitmap_context)->bitmap_header->vvol_id;

                        header_changed = 1;

                        ref->chunk_id  = chunk_id;

                        lsv_bitmap_cache_mark_dirty(ref, 0, BITMAP_CHUNK_SIZE / PAGE_SIZE);
                }
                else if(unlikely(pheader->chunk_map[prev_chunk_index].is_ref))
                {
                        lsv_bitmap_cache_unit_t *ref;
                        uint32_t new_chunk_id = 0;
                        assert(0);
                        uint8_t *cache_buf = lsv_bitmap_cache_get(bitmap_context, pheader->chunk_map[prev_chunk_index].vvol_id, chunk_id, 0,0,1,&ref);
                        if(!cache_buf) {
                                DFATAL("lsv bitmap cache get failed, ret=%d\r\n", errno);
                                return errno;
                        }
                        //assert(cache_buf);

                        ret = lsv_bitmap_alloc_chunk(bitmap_context->volume_context, &new_chunk_id, 0);
                        if(ret)
                        {
                                //fatal error.
                                return ret;
                        }

                        if(pheader->chunk_map[prev_chunk_index].vvol_id && pheader->chunk_map[prev_chunk_index].vvol_id != lsv_bitmap_get_current_volume(bitmap_context)->bitmap_header->vvol_id)
                        {
                                uint64_t vol_id = 0;
                                lsv_bitmap_vvol_to_vol(bitmap_context->volume_context, pheader->chunk_map[prev_chunk_index].vvol_id, &vol_id);

                                LSV_DBUG("volume_proto_remote_read_chunk vold_id=%lld\r\n", (LLU)vol_id);

                                ret = volume_proto_remote_read_chunk(bitmap_context->volume_context, vol_id, chunk_id, 0, CHUNK_SIZE, cache_buf);

                                if (ret) {
                                        LSV_DBUG("volume_proto_remote_read_chunk error %d\r\n", ret);
                                        // fatal error.

                                        lsv_bitmap_free_chunk(bitmap_context->volume_context, chunk_id);
                                        return ret;
                                }

                                LSV_DBUG("volume_proto_remote_read_chunk good\r\n");
                        }

                        LSV_DBUG("bitmap cow chunk_id %u to new_chunk_id %u \r\n", chunk_id, new_chunk_id);

                        //BITMAP_CHCHE_ID(bitmap_context) = chunk_id; //id changed, now cheat to lower level write to the new chunk...
                        ref->chunk_id = new_chunk_id; //id changed, no need write here, just cheat to lower level write to the new chunk...

                        pheader->chunk_map[prev_chunk_index].is_ref = 0;
                        pheader->chunk_map[prev_chunk_index].chunk_id = chunk_id = new_chunk_id;
                        pheader->chunk_map[prev_chunk_index].vvol_id = lsv_bitmap_get_current_volume(bitmap_context)->bitmap_header->vvol_id;

                        lsv_bitmap_cache_mark_dirty(ref, 0, BITMAP_CHUNK_SIZE / PAGE_SIZE);

                        header_changed = 1;

                        if(root->data_cow_callback)
                                        root->data_cow_callback(cache_buf);
                }

                LSV_DBUG("metadata lba:%llu,len:%d->chunk_id:%u,chunk_off:%u\n", (LLU)off,len, chunk_id,chunk_off);
                ret = lsv_write_bitmap_node(volume_context, bitmap_context, chunk_id, (uint32_t)chunk_off, DATAOFF_TO_BITMAPOFF(prev_len),
                                            (char *)bitmap_buf + DATAOFF_TO_BITMAPOFF(prev_off - off), 0);

                if(unlikely(ret)) {
                        DFATAL("write bitmap failed, ret = %d.\r\n", ret);
                        return ret;
                }
        }

        if(header_changed)
        {
                return bitmap_header_save(bitmap_context, SAVE_HEADER_LEVEL_CHUNKS);
        }
        else
                return ret;
}

int private lsv_bitmap_batch_write(void *volume_context, uint64_t off, uint32_t len, void *bitmap_buf)
{
        int ret;
        struct lsv_bitmap_context *root = ((lsv_volume_proto_t *)volume_context)->bitmap_context;
        struct lsv_bitmap_context * node = lsv_bitmap_volume_to_node(volume_context);

        lsv_bitmap_rlock(volume_context);

        LSV_DBUG("node %d %d %d\r\n", node->bitmap_header->chunk_map[0].chunk_id, node->bitmap_header->chunk_map[0].is_ref , node->bitmap_header->chunk_map[0].vvol_id);
        ret = lsv_bitmap_batch_write_node(volume_context, node, off, len, bitmap_buf);

        lsv_bitmap_unlock(volume_context);

        if(! (root->flags & LSV_BITMAP_FLAG_DISABLE_GC))
                lsv_gc_valueup_commit(volume_context);

        return ret;
}

int lsv_bitmap_vvol_to_vol(void *volume_context, uint32_t vvol_id, uint64_t *vol_id)
{
        struct lsv_bitmap_context *vol_node = ((lsv_volume_proto_t *)volume_context)->bitmap_context;
        *vol_id = vol_node->bitmap_header->vvol_map[vvol_id].vol_id;

        //todo, others.
        return 0;
}

static inline int lsv_bitmap_will_cow_node(struct lsv_bitmap_context *bitmap_context, uint64_t off)
{
        uint64_t chunk_index;
        struct lsv_bitmap_header *pheader = BITMAP_HEADER(bitmap_context);

        chunk_index = OFFSET_TO_INDEX(off);

        if(pheader->chunk_map[chunk_index].chunk_id == 0 || pheader->chunk_map[chunk_index].is_ref) //snapshot.
        {
                return 1;
        }

        return 0;
}

int lsv_bitmap_will_cow(void *volume_context, uint64_t off, uint32_t length)
{
        int ret = 0;
        struct lsv_bitmap_context * node = lsv_bitmap_volume_to_node(volume_context);

        // lsv_bitmap_rlock(volume_context);

        for(int i=0;i<length / LSV_PAGE_SIZE;i++)
                ret |= lsv_bitmap_will_cow_node(node, off + i * LSV_PAGE_SIZE);

        // lsv_bitmap_unlock(volume_context);

        return ret;
}

int private lsv_bitmap_do_cow_node(struct lsv_bitmap_context *bitmap_context, uint64_t off)
{
        int ret, header_changed = 0;
        uint32_t new_chunk_id;
        uint64_t chunk_index;
        struct lsv_bitmap_header *pheader = BITMAP_HEADER(bitmap_context);
        //struct lsv_bitmap_context *root = ((lsv_volume_proto_t *)bitmap_context->volume_context)->bitmap_context;

        chunk_index = OFFSET_TO_INDEX(off);

        uint32_t chunk_id = pheader->chunk_map[chunk_index].chunk_id;
        if(pheader->chunk_map[chunk_index].chunk_id == 0) {
                ret = lsv_bitmap_alloc_chunk(bitmap_context->volume_context, &new_chunk_id, 1);
                if (ret) {
                        // fatal error.
                        return ret;
                }

                assert(pheader->chunk_map[chunk_index].chunk_id == 0);
                DINFO("bitmap cow allocated chunk: %d\r\n", new_chunk_id);

                pheader->chunk_map[chunk_index].is_ref = 0;
                pheader->chunk_map[chunk_index].chunk_id = new_chunk_id;
                pheader->chunk_map[chunk_index].vvol_id = lsv_bitmap_get_current_volume(bitmap_context)->bitmap_header->vvol_id;

                header_changed = 1;
        }
        else if(pheader->chunk_map[chunk_index].is_ref) {
                uint32_t new_chunk_id = 0;
                void *bitmap_buf;

                ret = lsv_bitmap_alloc_chunk(bitmap_context->volume_context, &new_chunk_id, 0);
                if (ret) {
                        return ret;
                }

                bitmap_buf = xmalloc(LSV_CHUNK_SIZE);
                ret = lsv_read_bitmap_node(bitmap_context, pheader->chunk_map[chunk_index].vvol_id, chunk_id, 0, LSV_CHUNK_SIZE, bitmap_buf);
                if(ret) {
                        DERROR("lsv_read_bitmap_node failed, err:%d\r\n", ret);
                        lsv_bitmap_free_chunk(bitmap_context->volume_context, chunk_id);
                        return ret;
                }

                if (bitmap_context->data_cow_callback)
                        bitmap_context->data_cow_callback(bitmap_buf);

                ret = lsv_bitmap_write_chunk(bitmap_context->volume_context, new_chunk_id, 0, LSV_CHUNK_SIZE, bitmap_buf);
                if(ret) {
                        DERROR("lsv_read_bitmap_node failed, err:%d\r\n", ret);
                        xfree(bitmap_buf);
                        lsv_bitmap_free_chunk(bitmap_context->volume_context, chunk_id);
                        return ret;
                }

                LSV_DBUG("bitmap cow chunk_id %u to new_chunk_id %u \r\n", chunk_id, new_chunk_id);

                pheader->chunk_map[chunk_index].is_ref = 0;
                pheader->chunk_map[chunk_index].chunk_id = new_chunk_id;
                pheader->chunk_map[chunk_index].vvol_id = lsv_bitmap_get_current_volume(bitmap_context)->bitmap_header->vvol_id;

                header_changed = 1;
        }

        if(header_changed)
                return bitmap_header_save(bitmap_context, SAVE_HEADER_LEVEL_CHUNKS);
        else
                return 0;
}

int lsv_bitmap_do_cow(void *volume_context, uint64_t off, uint32_t length)
{
        int ret = 0;
        struct lsv_bitmap_context * node = lsv_bitmap_volume_to_node(volume_context);

        // lsv_bitmap_rlock(volume_context);

        for(int i=0;i<length / LSV_PAGE_SIZE;i++)
                ret |= lsv_bitmap_do_cow_node(node, off + i * LSV_PAGE_SIZE);

        // lsv_bitmap_unlock(volume_context);

        return ret;
}

typedef struct {

        int (*init)(void *volume_context, uint32_t create_type, uint32_t flag);

        //clone
        int (*init_by)(void *volume_context, uint64_t parent_vol_id, void *buf, uint32_t flags);
        int (*is_ready)(void *volume_context);

        int (*header_read)(struct lsv_bitmap_context *bitmap_context, uint64_t off, uint32_t len, void * buffer);
        int (*header_load)(struct lsv_bitmap_context *bitmap_root, uint32_t first_chunk_id);
        int (*header_save)(struct lsv_bitmap_context *bitmap_context, uint32_t level);

        int (*node_create)(void *volume_context, struct lsv_bitmap_context **bitmap_context, uint8_t is_root, uint32_t type);

        int (*read_batch)(void *volume_context, uint64_t off, uint32_t len, void *bitmap_buf);
        int (*release_batch)(void *volume_context, uint64_t off, uint32_t len);

        int (*write_batch)(void *volume_context, uint64_t off, uint32_t len, void *bitmap_buf);

        int (*need_cow)(void *volume_context, uint64_t off, uint32_t length);
        int (*do_cow)(void *volume_context, uint64_t off, uint32_t length);
} lsv_bitmap_api_t;
